 1.大数据高速计算引擎Spark（上）之Spark Core中RDD编程下算子综合应用案例
   
   1).WordCount - scala
   备注：打包上传服务器运行
package cn.lagou.sparkcore

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object ScalaWordCount {
  def main(args: Array[String]): Unit = {
    // 1、创建SparkContext
    val conf = new SparkConf().setAppName("ScalaWordCount")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

	// 2、读本地文件(集群运行：输入参数)
    val lines: RDD[String] = sc.textFile(args(0))

	// 3、RDD转换
    val words: RDD[String] = lines.flatMap(line => line.split("\\s+"))
    val wordsMap: RDD[(String, Int)] = words.map(x => (x, 1))
    val result: RDD[(String, Int)] = wordsMap.reduceByKey(_ +_)
	
	// 4、输出
    result.foreach(println)

	// 5、关闭SparkContext
    sc.stop()
    // 6、打包，使用spark-submit提交集群运行
    // spark-submit --master local[*] --class cn.lagou.sparkcore.WordCount \
    // original-LagouBigData-1.0-SNAPSHOT.jar /wcinput/*
    // spark-submit --master yarn --class cn.lagou.sparkcore.WordCount \
    // original-LagouBigData-1.0-SNAPSHOT.jar /wcinput/*
  }
}
   2).WordCount - java
   Spark提供了：Scala、Java、Python、R语言的API；
   对 Scala 和 Java 语言的支持最好；
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;

public class JavaWordCount {
    public static void main(String[] args) {
        // 1.创建SparkContext
        SparkConf conf = 
                new SparkConf().setAppName("JavaWordCount").setMaster("local[*]");
        JavaSparkContext jsc = new JavaSparkContext(conf);
        jsc.setLogLevel("warn");
        // 2.生成RDD
        JavaRDD<String> lines =
                jsc.textFile("file:///D:\\\\jdbc_work\\\\Lagou_BigData\\\\data\\\\wc.txt");
        // 3.RDD转换
        JavaRDD<String> words = 
                lines.flatMap(line -> Arrays.stream(line.split("\\s+")).iterator());
        JavaPairRDD<String, Integer> wordsMap = 
                words.mapToPair(word -> new Tuple2<>(word, 1));
        JavaPairRDD<String, Integer> result = 
                wordsMap.reduceByKey((x, y) -> x + y);
        // 4.结果输出
        result.foreach(elem -> System.out.println(elem));
        // 5.关闭SparkContext
        jsc.stop();
    }
}

   备注：
       Spark入口点：JavaSparkContext
       Value-RDD：JavaRDD；key-value RDD：JavaPairRDD
       JavaRDD 和 JavaPairRDD转换
           JavaRDD => JavaPairRDD：通过mapToPair函数
           JavaPairRDD => JavaRDD：通过map函数转换
       lambda表达式使用 ->
   3).计算圆周率
   蒙特卡洛法 圆的面积：pi 正方形的面积4 一共的射击次数：N
   射击落在单位圆的次数：n  4/pi=N/n  >  pi=4*n/N
package cn.lagou.sparkcore

import org.apache.spark.{SparkConf, SparkContext}

import scala.math.random

object SparkPi {
  def main(args: Array[String]): Unit = {
    // 1.创建SparkContext
    val conf = new SparkConf().setAppName(
	  this.getClass.getCanonicalName.init).setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val slices = if (args.length > 0) args(0).toInt else 10
    val N = 100000000
    // 2.生成RDD、RDD转换
    val n = sc.makeRDD(1 to N, slices)
        .map(idx => {
          val (x, y) = (random, random)
          if (x*x + y*y <= 1) 1 else 0
        }).sum()

    // 3.结果输出
    val pi = 4.0 * n / N
    println(s"pi = $pi")
    // 4.关闭SparkContext
    sc.stop()
  }
}

   4).广告数据统计
   数据格式：timestamp province city userid adid 时间点 省份 城市 用户 广告
   需求：1、统计每一个省份点击TOP3的广告ID 2、 统计每一个省份每一个小时的
TOP3广告ID

package cn.lagou.sparkcore

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.joda.time.DateTime

object Adstat {
  def main(args: Array[String]): Unit = {
    // 1.创建SparkContext
    val conf = new SparkConf().setAppName(
	  this.getClass.getCanonicalName.init).setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    val  N = 3
    // 2.生成RDD
    val lines: RDD[String] =
      sc.textFile("file:///D:\\jdbc_work\\Lagou_BigData\\data\\advert.log")
    // 3.RDD转换
    // 1、统计每一个省份点击TOP3的广告ID
    val stat1RDD: RDD[(String, String)] = lines.map { line =>
      val fields: Array[String] = line.split("\\s+")
      (fields(1), fields(4))
    }

    // 按省份、广告汇总
    val reduce1RDD: RDD[((String, String), Int)] = stat1RDD.map { 
	case (provice, adid) => ((provice, adid), 1) 
	}
        .reduceByKey(_ + _)
    // 对以上汇总信息求Top3
    reduce1RDD.map{case ((provice, adid), count) => (provice, (adid, count))}
        .groupByKey()
        .mapValues(buf => buf.toList.sortWith(_._2 > _._2).take(N).map(_._1).mkString(":"))
        .foreach(println)

    // 2、统计每一个省份每一个小时的TOP3广告ID
    lines.map{line =>
    val fields: Array[String] = line.split("\\s+")
      ((getHour(fields(0)), fields(1), fields(4)), 1)
    }.reduceByKey(_+_)
      .map{case ((hour,provice, adid), count) => ((provice, hour), (adid, count))}
        .groupByKey()
        .mapValues(buf => buf.toList.sortWith(_._2 > _._2).take(N).map(_._1).mkString(":"))
        .foreach(println)

    // 5.关闭SparkContext
    sc.stop()
  }

  def getHour(str: String): Int = {
    val dt = new DateTime(str.toLong)
    dt.getHourOfDay
  }
}


<dependency>
    <groupId>joda-time</groupId>
    <artifactId>joda-time</artifactId>
    <version>2.9.7</version>
</dependency>
   在Java 8 出现前的很长时间内成为Java中日期时间处理的事实标准，用来弥补JDK的不足。

   Joda 类具有不可变性，它们的实例无法被修改。（不可变类的一个优点就是它们是线程安全的）

   在 Spark Core 程序中使用时间日期类型时，不要使用 Java 8 以前的时间日期类型，线程不安全。
   5).找共同好友
   原始数据：
100, 200 300 400 500 600
200, 100 300 400
300, 100 200 400 500
400, 100 200 300
500, 100 300
600, 100
   第一列表示用户，后面的表示该用户的好友
   要求：
   1、 查找两两用户的共同好友
   2、 最后的结果按前两个id号有序排序
package cn.lagou.sparkcore

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object FindFriends {
  def main(args: Array[String]): Unit = {
    // 创建SparkContext
    val conf = new SparkConf().setAppName(
	  this.getClass.getCanonicalName.init).setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val lines: RDD[String] = 
      sc.textFile("file:///D:\\jdbc_work\\Lagou_BigData\\data\\fields.dat")
    // 方法一：核心思想利用笛卡尔积求两两的好友，然后去除多余的数据
    // 效率低，方法比较容易想到
    val friendsRDD: RDD[(String, Array[String])] = lines.map { line =>
      val fields = line.split(",")
      val userId = fields(0).trim
      val friends: Array[String] = fields(1).trim.split("\\s+")
      (userId, friends)
    }

    friendsRDD.cartesian(friendsRDD)
      //.filter{x => x._1._1 > x._2._1}
      .filter{case ((id1, _), (id2, _)) => id1 < id2}
      .map{case ((id1, friends1), (id2, friends2)) =>
        //((id1, id2), friends1.toSet & friends2.toSet)
        ((id1, id2), friends1.intersect(friends2).sorted.toBuffer)
      }
        .sortByKey()
        .collect.foreach(println)

    println("***************************************************************")
    // 方法二：消除笛卡尔积，更高效
    // 核心思想：将数据变型，找到两两的好友，再执行数据的合并
    friendsRDD.flatMapValues(friends => friends.combinations(2))
        //.map(x => (x._2.mkString(" & "), Set(x._1)))
      .map{case (k, v) => (v.mkString(" & "), Set(k))}
      .reduceByKey(_ | _)
      .sortByKey()
      .collect.foreach(println)

    // 备注：flatMapValues / combinations / 数据的变形 / reduceByKey / 集合的操作

    // 关闭SparkContext
    sc.stop()
  }

}

   
val s1 = (1 to 5).toSet
val s2 = (3 to 8).toSet
// 交。intersect
println(s1 & s2)
// 并。union
println(s1 | s2)
// 差。diff
println(s1 &~ s2)
   6).Super WordCount
   要求：将单词全部转换为小写，去除标点符号(难)，去除停用词(难)；最后按照count 
值降序保存到文件，同时将全部结果保存到MySQL(难)；标点符号和停用词可以自定义。
   停用词：语言中包含很多功能词。与其他词相比，功能词没有什么实际含义。最普遍
的功能词是[限定词](the、a、an、that、those)，介词(on、in、to、from、over等)
、代词、数量词等。
   
   Array[(String, Int)] => scala jdbc => MySQL
   
package cn.lagou.sparkcore

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SuperWordCount1 {
  private val stopWords =
    "in on to from by a an the is are were was i we you your he his some any of as can it each"
      .split("\\s+")
  private val punctuation = "[\\)\\.,:;’!\\?]"

  def main(args: Array[String]): Unit = {
    // 1.创建SparkContext
    val conf =
      new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    // RDD转换
    // 换为小写，去除标点符号(难)，去除停用词(难)
    val lines: RDD[String] =
    sc.textFile("file:///D:\\jdbc_work\\Lagou_BigData\\data\\swc.dat")
    lines.flatMap(_.split("\\s+"))
      .map(_.toLowerCase)
      .map(_.replaceAll(punctuation, ""))
      .filter(word => !stopWords.contains(word) && word.trim.length > 0)
      .map((_, 1))
      .reduceByKey(_ + _)
      .sortBy(_._2, false)
      .collect.foreach(println)
    
    // 5.关闭SparkContext
    sc.stop()
  }
}


   引入依赖
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.44</version>
</dependency>

package cn.lagou.sparkcore

import java.sql.{Connection, DriverManager, PreparedStatement}

object JDBCDemo {
  def main(args: Array[String]): Unit = {
    // 定义结果集
    val str =
      "hadoop spark java scala hbase hive sqoop hue tez atlas datax grinffin zk kafka"
    val result: Array[(String, Int)] = str.split("\\s+").zipWithIndex
    // 定义参数
    val url =
      "jdbc:mysql://linux123:3306/" +
        "ebiz?useUnicode=true&characterEncoding=utf-8&useSSL=false"
    val username = "hive"
    val password = "12345678"
    // jdbc保存数据
    var conn: Connection = null
    var stmt: PreparedStatement = null
    val sql = "insert into wordcount values (?, ?)"
    try {
      conn = DriverManager.getConnection(url, username, password)
      stmt = conn.prepareStatement(sql)
      result.foreach { case (k, v) =>
        stmt.setString(1, k)
        stmt.setInt(2, v)
        stmt.executeUpdate()
      }
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      if (stmt != null) stmt.close()
      if (conn != null) conn.close()
    }
  }

}

   create table wordcount(word varchar(30), count int);
   未优化的程序：使用 foreach 保存数据，要创建大量的链接
package cn.lagou.sparkcore

import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SuperWordCount2 {
  private val stopWords =
    "in on to from by a an the is are were was i we you your he his some any of as can it each"
      .split("\\s+")
  private val punctuation = "[\\)\\.,:;’!\\?]"
  private val url =
    "jdbc:mysql://linux123:3306/" +
      "ebiz?useUnicode=true&characterEncoding=utf-8&useSSL=false"
  private val username = "hive"
  private val password = "12345678"

  def main(args: Array[String]): Unit = {
    // 1.创建SparkContext
    val conf =
      new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    // RDD转换
    // 换为小写，去除标点符号(难)，去除停用词(难)
    val lines: RDD[String] =
    sc.textFile("file:///D:\\jdbc_work\\Lagou_BigData\\data\\swc.dat")
    val resultRDD: RDD[(String, Int)] = lines.flatMap(_.split("\\s+"))
      .map(_.toLowerCase)
      .map(_.replaceAll(punctuation, ""))
      .filter(word => !stopWords.contains(word) && word.trim.length > 0)
      .map((_, 1))
      .reduceByKey(_ + _)
      .sortBy(_._2, false)

    // 4.结果输出
    resultRDD.saveAsTextFile("file:///D:\\jdbc_work\\Lagou_BigData\\data\\superwc")
    // 使用foreach，对每条记录创建连接
    resultRDD.foreach { case (k, v) =>
      // jdbc保存数据
      var conn: Connection = null
      var stmt: PreparedStatement = null
      val sql = "insert into wordcount values (?, ?)"
      try {
        conn = DriverManager.getConnection(url, username, password)
        stmt = conn.prepareStatement(sql)
        stmt.setString(1, k)
        stmt.setInt(2, v)
        stmt.executeUpdate()
      } catch {
        case e: Exception => e.printStackTrace()
      } finally {
        if (stmt != null) stmt.close()
        if (conn != null) conn.close()
      }
    }
    // 5.关闭SparkContext
    sc.stop()
  }
}


   优化后的程序：使用 foreachPartition 保存数据，一个分区创建一个链接；cache RDD
package cn.lagou.sparkcore

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SuperWordCount3 {
  private val stopWords =
    "in on to from by a an the is are were was i we you your he his some any of as can it each"
      .split("\\s+")
  private val punctuation = "[\\)\\.,:;’!\\?]"
  private val url =
    "jdbc:mysql://linux123:3306/" +
      "ebiz?useUnicode=true&characterEncoding=utf-8&useSSL=false"
  private val username = "hive"
  private val password = "12345678"

  def main(args: Array[String]): Unit = {
    // 1.创建SparkContext
    val conf =
      new SparkConf().setAppName(
        this.getClass.getCanonicalName.init).setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    // RDD转换
    // 换为小写，去除标点符号(难)，去除停用词(难)
    val lines: RDD[String] =
    sc.textFile("file:///D:\\jdbc_work\\Lagou_BigData\\data\\swc.dat")
    val resultRDD: RDD[(String, Int)] = lines.flatMap(_.split("\\s+"))
      .map(_.toLowerCase)
      .map(_.replaceAll(punctuation, ""))
      .filter(word => !stopWords.contains(word) && word.trim.length > 0)
      .map((_, 1))
      .reduceByKey(_ + _)
      .sortBy(_._2, false)

    // 4.结果输出
    resultRDD.saveAsTextFile("file:///D:\\jdbc_work\\Lagou_BigData\\data\\superwc")
    // 使用foreach，对每条记录创建连接
    resultRDD.foreachPartition { iter => saveAsMySQL(iter) }
    // 5.关闭SparkContext
    sc.stop()
  }

  def saveAsMySQL(iter: Iterator[(String, Int)]): Unit = {
    // jdbc保存数据
    var conn: Connection = null
    var stmt: PreparedStatement = null
    val sql = "insert into wordcount values (?, ?)"
    try {
      conn = DriverManager.getConnection(url, username, password)
      stmt = conn.prepareStatement(sql)
      iter.foreach { case (k, v) =>
        stmt.setString(1, k)
        stmt.setInt(2, v)
        stmt.executeUpdate()
      }
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      if (stmt != null) stmt.close()
      if (conn != null) conn.close()
    }
  }
}
// 在SparkSQL中有内建的访问MySQL的方法，调用非常方便
// SparkCore、SQL不支持的外部存储


   备注：
        SparkSQL有方便的读写MySQL的方法，给参数直接调用即可；
        但以上掌握以上方法非常有必要，因为SparkSQL不是支持所有的类型的数据库
	